package com.bsj.travel.mqtt.cluster;

/**
 * 下行消息，发送到设备
 *
 * @author L.cm
 */
//@Slf4j
//public class MqttMessageDownReceiverRedis {
//	private final IMessageSerializer messageSerializer;
//	private final MqttServer mqttServer;
//
//	public MqttMessageDownReceiverRedis(IMessageSerializer messageSerializer, MqttServer mqttServer) {
//		this.messageSerializer = messageSerializer;
//		this.mqttServer = mqttServer;
//	}
//
//	@RStreamListener(name = RedisKeys.REDIS_CHANNEL_DOWN_KEY,
//            messageModel = MessageModel.BROADCASTING,
//            readRawBytes = true
//	)
//	public void mqttMessageDownReceiver(MapRecord<String, String, byte[]> mapRecord) {
//		// 手动序列化和反序列化，避免 redis 序列化不一致问题
//		Map<String, byte[]> recordValue = mapRecord.getValue();
//		recordValue.forEach((key, messageBody) -> {
//			// 手动序列化和反序列化，避免 redis 序列化不一致问题
//			Message mqttMessage = messageSerializer.deserialize(messageBody);
//			if (mqttMessage == null) {
//				return;
//			}
//			// 下行消息，发送到设备
//			String topic = mqttMessage.getTopic();
//			if (StringUtil.isBlank(topic)) {
//				log.error("Mqtt down stream topic is blank.");
//				return;
//			}
//			String clientId = mqttMessage.getClientId();
//			byte[] payload = mqttMessage.getPayload();
//			MqttQoS mqttQoS = MqttQoS.valueOf(mqttMessage.getQos());
//			boolean retain = mqttMessage.isRetain();
//			if (StringUtil.isBlank(clientId)) {
//				mqttServer.publishAll(topic, payload, mqttQoS, retain);
//			} else {
//				mqttServer.publish(clientId, topic, payload, mqttQoS, retain);
//			}
//		});
//	}
//
//}
